#include "config.h"

#include <sys/types.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "rpc_proto.h"
#include "net_global.h"
#include "cluster.h"
#include "dispatch.h"
#include "ynet_rpc.h"
#include "job_dock.h"
#include "dbg.h"

typedef enum {
        DISPATCH_RPC_NULL = 100,
        DISPATCH_RPC_NETINFO,
        DISPATCH_RPC_NEWID,
        DISPATCH_RPC_HEARTBEAT,
        DISPATCH_RPC_VOL_NOTIFICTION,
        DISPATCH_RPC_WRITEABLE,
        DISPATCH_RPC_NEWDISK,
        DISPATCH_RPC_SYSSTAT,
        DISPATCH_RPC_POOLDUMP,
        DISPATCH_RPC_ADDNODE,
        DISPATCH_RPC_DELNODE,
        DISPATCH_RPC_LIST_STORAGE_AREA,
        DISPATCH_RPC_CHECK_STORAGE_AREA,
        DISPATCH_RPC_MAX,
} dispatch_rpc_op_t;

typedef struct {
        uint32_t op;
        uint32_t count;
} dispatch_newinfo_req_t;

typedef rpc_msg_t msg_t;

static int inited;

static __request_handler_func__  __request_handler__[DISPATCH_RPC_MAX - DISPATCH_RPC_NULL];
static char  __request_name__[DISPATCH_RPC_MAX - DISPATCH_RPC_NULL][__RPC_HANDLER_NAME__ ];

static void __request_set_handler(int op, __request_handler_func__ func, const char *name)
{
        YASSERT(strlen(name) + 1 < __RPC_HANDLER_NAME__ );
        strcpy(__request_name__[op - DISPATCH_RPC_NULL], name);
        __request_handler__[op - DISPATCH_RPC_NULL] = func;
}

static void __request_get_handler(int op, __request_handler_func__ *func, char **name)
{
        *func = __request_handler__[op - DISPATCH_RPC_NULL];
        *name = __request_name__[op - DISPATCH_RPC_NULL];
}

static void __getmsg(buffer_t *buf, msg_t **_req, int *buflen, char *_buf)
{
        msg_t *req;

        YASSERT(buf->len <= MEM_CACHE_SIZE4K);

        req = (void *)_buf;
        *buflen = buf->len - sizeof(*req);
        mbuffer_get(buf, req, buf->len);

        *_req = req;
}

static int __dispatch_srv_writeable(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, *count, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        nid_t *diskid;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &diskid, NULL, &count, NULL, NULL);

        ret = dispatch_srv_writeable(diskid, *count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_netinfo(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        char *_info = buf + PAGE_SIZE / 2;
        nid_t *nid;
        ynet_net_info_t *info;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &nid, NULL, NULL);

        info = (void *)_info;
        ret = dispatch_srv_netinfo(info, nid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, info, info->len);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_newid(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        chkid_t chkid;

        (void) _buf;

        ret = dispatch_srv_newid(&chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &chkid, sizeof(chkid));

        return 0;
err_ret:
        return ret;
}

static int __dispatch_srv_heartbeat(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        nodestat_t *stat;
        const char *status, *_name, *dfinfo;
        uint32_t version;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &_name, NULL, &dfinfo, NULL,
                        &stat, NULL, &status, NULL, NULL);

        ret = dispatch_srv_heartbeat(_name, dfinfo, stat, status, &version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &version, sizeof(version));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_vol_notifiction(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, valuelen;
        uint32_t *options;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        local_vol_t *vols = NULL;
        const char *_name;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &_name, NULL,
                        &options, &valuelen,
                        &vols, NULL,
                        NULL);

        (void) valuelen;
        ret = dispatch_srv_vol_notifiction(_name, vols, *options);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_newdisk(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const uint32_t *repnum, *skip_count, *flag;
        const nid_t *skip;
        diskid_t newdisk[LICH_REPLICA_MAX];
        const char *pool;
        uint32_t tmp;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen,
                       &pool, NULL,
                       &repnum, NULL,
                       &skip, &tmp,
                       &skip_count, NULL,
                       &flag, NULL,
                       NULL);

        skip = skip_count ? skip : NULL;
        //YASSERT((skip_count == 0 && skip == NULL) || (skip_count && skip));

        ret = dispatch_srv_newdisk(pool, newdisk, *repnum, skip, *skip_count, *flag);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &newdisk,
                  sizeof(diskid_t) * (*repnum));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_sysstat(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *force;
        msg_t *req;
        lich_stat_t stat;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &force, NULL, NULL);

        ret = dispatch_srv_sysstat(&stat, *force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &stat, sizeof(stat));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_pooldump(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        const char *pool;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &pool, NULL, NULL);

        ret = dispatch_srv_pooldump(pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_addnode(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        lich_stat_t stat;
        char *name;
        const nid_t *nid;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &name, NULL, &nid, NULL, NULL);

        ret = dispatch_srv_addnode(name, nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &stat, sizeof(stat));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_delnode(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        lich_stat_t stat;
        char *name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &name, NULL, NULL);

        ret = dispatch_srv_delnode(name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &stat, sizeof(stat));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_check_storage_area(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *site_name;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &site_name, NULL, NULL);

        ret = dispatch_srv_check_storage_area(site_name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __dispatch_srv_list_storage_area(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        int count;

        (void)req;
        (void)_buf;
        (void)buflen;

        ret = dispatch_srv_list_storage_area(buf, &count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, buf, count);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_list_storage_area(const nid_t *nid, void *_buf, int *_count)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_LIST_STORAGE_AREA;
        count = 0;

        ret = rpc_request_wait("dispatch_rpc_list_storage_area", nid,
                               req, sizeof(*req) + count,
                               _buf, _count,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_check_storage_area(const nid_t *nid, const char *site_name)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_CHECK_STORAGE_AREA;
        _opaque_encode(req->buf, &count, site_name, strlen(site_name) + 1, NULL);

        ret = rpc_request_wait("dispatch_rpc_check_storage_area", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_writeable(const nid_t *nid, const nid_t *diskid, int _count)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_WRITEABLE;
        _opaque_encode(req->buf, &count, diskid, sizeof(*nid) * _count,
                       _count, sizeof(_count), NULL);

        ret = rpc_request_wait("dispatch_rpc_writeable", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_netinfo(const nid_t *nid, ynet_net_info_t *info, const nid_t *target)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_NETINFO;
        _opaque_encode(req->buf, &count, target, sizeof(*target), NULL);

        ret = rpc_request_wait("dispatch_rpc_netinfo", nid,
                               req, sizeof(*req) + count,
                               info, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_newid(const nid_t *nid, chkid_t *chkid)
{
        int ret;
        msg_t *req;
        uint32_t count;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = DISPATCH_RPC_NEWID;
        count = 0;

        ret = rpc_request_wait("dispatch_rpc_newid", nid,
                               req, sizeof(*req) + count,
                               chkid, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_heartbeat(const nid_t *nid, const char *name, const char *dfinfo,
                           const nodestat_t *stat, const char *status, uint32_t *admin_uptime)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        YASSERT(!net_isnull(&stat->nid));

        req = (void *)buf;
        req->op = DISPATCH_RPC_HEARTBEAT;
        _opaque_encode(req->buf, &count,
                       name, strlen(name) + 1,
                       dfinfo, strlen(dfinfo) + 1,
                       stat, sizeof(*stat),
                       status, strlen(status) + 1, NULL);

        ret = rpc_request_wait("dispatch_rpc_heartbeat", nid,
                               req, sizeof(*req) + count,
                               admin_uptime, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_vol_notifiction(const nid_t *nid, const char *name, const local_vol_t *vols, uint32_t options)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_VOL_NOTIFICTION;
        _opaque_encode(req->buf, &count,
                       name, strlen(name) + 1,
                       &options, sizeof(uint32_t),
                       vols, sizeof(local_vol_t) + sizeof(chkid_t) * vols->volcount,
                       NULL);

        DINFO("local_vol notifiction %llu\n", (LLU)vols->volcount);
        ret = rpc_request_wait("dispatch_rpc_vol_notifiction", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

/*
  const nid_t *nid, const char *pool, diskid_t *diskid,
  uint32_t *_disk_count, const char *site_name,
  const nid_t *_inrack, const diskid_t *skip, uint32_t skip_count, int force)
*/

int dispatch_rpc_newdisk(const nid_t *nid, const char *pool, diskid_t *diskid, uint32_t repnum,
                         const nid_t *_skip, uint32_t skip_count, int flag)
{
        int ret, replen = MAX_MSG_SIZE;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *skip;
        nid_t tmp;
        uint32_t count;

        if (_skip) {
                skip = _skip;
                YASSERT(skip_count);
        } else {
                skip = &tmp;
                memset(&tmp, 0x0, sizeof(tmp));
                YASSERT(!skip_count);
        }  

        req = (void *)buf;

        _opaque_encode(req->buf, &count,
                       pool, strlen(pool) + 1,
                       &repnum, sizeof(repnum),
                       skip, sizeof(*skip) * skip_count,
                       &skip_count, sizeof(skip_count),
                       &flag, sizeof(flag),
                       NULL);

        req->op = DISPATCH_RPC_NEWDISK;
        ret = rpc_request_wait("dispatch_rpc_newdisk", nid,
                               req, sizeof(*req) + count,
                               diskid, &replen,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_sysstat(const nid_t *nid, lich_stat_t *stat, uint32_t force)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_SYSSTAT;
        _opaque_encode(req->buf, &count, &force, sizeof(force), NULL);

        ret = rpc_request_wait("dispatch_rpc_sysstat", nid,
                               req, sizeof(*req) + count,
                               stat, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_pooldump(const nid_t *nid, const char *pool)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_POOLDUMP;
        _opaque_encode(req->buf, &count, pool, strlen(pool) + 1, NULL);

        ret = rpc_request_wait("dispatch_rpc_pooldump", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_addnode(const nid_t *nid, const char *name, const nid_t *nodeid)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_ADDNODE;
        _opaque_encode(req->buf, &count, name, strlen(name) + 1, nodeid, sizeof(*nodeid), NULL);

        ret = rpc_request_wait("dispatch_rpc_addnode", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int dispatch_rpc_delnode(const nid_t *nid, const char *name)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = DISPATCH_RPC_DELNODE;
        _opaque_encode(req->buf, &count, name, strlen(name) + 1, NULL);

        ret = rpc_request_wait("dispatch_rpc_delnode", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_DISPATCH, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static void __request_handler(void *arg)
{
        int ret;
        msg_t req;
        sockid_t sockid;
        msgid_t msgid;
        buffer_t buf;
        __request_handler_func__ handler;
        char *name;

        request_trans(arg, &sockid, &msgid, &buf, NULL);

        if (buf.len < sizeof(req)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        if (inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        mbuffer_get(&buf, &req, sizeof(req));

        DBUG("new op %u from %s, id (%u, %x)\n", req.op,
              _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint);

        __request_get_handler(req.op, &handler, &name);
        if (handler == NULL) {
                ret = ENOSYS;
                DWARN("error op %u\n", req.op);
                GOTO(err_ret, ret);
        }

        schedule_task_setname(name);

        ret = handler(&sockid, &msgid, &buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&buf);

        DBUG("reply op %u from %s, id (%u, %x)\n", req.op,
             _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint);

        return ;
err_ret:
        mbuffer_free(&buf);
        rpc_reply_error(&sockid, &msgid, ret);
        DBUG("error op %u from %s, id (%u, %x) ret %d\n", req.op,
             _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint, ret);
        return;
}

int dispatch_rpc_init()
{
        __request_set_handler(DISPATCH_RPC_WRITEABLE, __dispatch_srv_writeable, "dispatch_srv_writeable");
        __request_set_handler(DISPATCH_RPC_NETINFO, __dispatch_srv_netinfo, "dispatch_srv_netinfo");
        __request_set_handler(DISPATCH_RPC_NEWID, __dispatch_srv_newid, "dispatch_srv_newid");
        __request_set_handler(DISPATCH_RPC_HEARTBEAT, __dispatch_srv_heartbeat, "dispatch_srv_heartbeat");
        __request_set_handler(DISPATCH_RPC_VOL_NOTIFICTION, __dispatch_srv_vol_notifiction, "dispatch_srv_vol_notifiction");
        __request_set_handler(DISPATCH_RPC_NEWDISK, __dispatch_srv_newdisk, "dispatch_srv_newdisk");
        __request_set_handler(DISPATCH_RPC_SYSSTAT, __dispatch_srv_sysstat, "dispatch_srv_sysstat");
        __request_set_handler(DISPATCH_RPC_POOLDUMP, __dispatch_srv_pooldump, "dispatch_srv_pooldump");
        __request_set_handler(DISPATCH_RPC_ADDNODE, __dispatch_srv_addnode, "dispatch_srv_addnode");
        __request_set_handler(DISPATCH_RPC_DELNODE, __dispatch_srv_delnode, "dispatch_srv_delnode");
        __request_set_handler(DISPATCH_RPC_LIST_STORAGE_AREA, __dispatch_srv_list_storage_area, "dispatch_srv_list_storage_area");
        __request_set_handler(DISPATCH_RPC_CHECK_STORAGE_AREA, __dispatch_srv_check_storage_area, "dispatch_srv_check_storage_area");
        rpc_request_register(MSG_DISPATCH, __request_handler, NULL);
        inited = 1;

        return 0;
}

void dispatch_rpc_destroy()
{
        rpc_request_register(MSG_DISPATCH, NULL, NULL);
        inited = 0;
}
